package com.huan.table

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object TableAPITest {

  //TODO 定义表的来源
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)
    //定义表的来源,和外部进行连接
    val path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt"
    tableEnv.connect( new FileSystem().path(path))  //可以放kafka,redis,es,等等流数据
      .withFormat(new Csv()) //定义数据格式化方法
      .withSchema( new Schema() //定义表结构
        .field("id", DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable")//创建出来一张表

    val inputTable : Table = tableEnv.from("inputTable") //上面创建的表要与下面的from下的表相同


    //从kafka 中连接
    /**
    tableEnv.connect( new Kafka()
        .version("0.11")
        .topic("sensor")
        .property("zookeeper.connect","localhost:2181")
        .property("bootstrap.servers","localhost:9092")
    )  //可以放kafka,redis,es,等等流数据
      .withFormat(new Csv()) //定义数据格式化方法
      .withSchema( new Schema() //定义表结构
        .field("id", DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable")//创建出来一张表
    val KafkaInputTable : Table = tableEnv.from("inputTable") //上面创建的表要与下面的from下的表相同

     */


    inputTable.toAppendStream[(String,Long,Double)].print()

    env.execute("TableAPITest")
  }
}
